{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "Copyright (c) Microsoft Corporation. All rights reserved.  \n",
        "Licensed under the MIT License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Azure Machine Learning Pipelines with Data Dependency\n",
        "In this notebook, we will see how we can build a pipeline with implicit data dependency."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Prerequisites and Azure Machine Learning Basics\n",
        "If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, make sure you go through the [configuration Notebook](https://aka.ms/pl-config) first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. \n",
        "\n",
        "### Azure Machine Learning and Pipeline SDK-specific Imports"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import azureml.core\n",
        "from azureml.core import Workspace, Experiment, Datastore\n",
        "from azureml.core.compute import AmlCompute\n",
        "from azureml.core.compute import ComputeTarget\n",
        "from azureml.widgets import RunDetails\n",
        "\n",
        "# Check core SDK version number\n",
        "print(\"SDK version:\", azureml.core.VERSION)\n",
        "\n",
        "from azureml.data.data_reference import DataReference\n",
        "from azureml.pipeline.core import Pipeline, PipelineData\n",
        "from azureml.pipeline.steps import PythonScriptStep\n",
        "print(\"Pipeline SDK-specific imports completed\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Initialize Workspace\n",
        "\n",
        "Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "tags": [
          "create workspace"
        ]
      },
      "outputs": [],
      "source": [
        "ws = Workspace.from_config()\n",
        "print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\\n')\n",
        "\n",
        "# Default datastore (Azure blob storage)\n",
        "# def_blob_store = ws.get_default_datastore()\n",
        "def_blob_store = Datastore(ws, \"workspaceblobstore\")\n",
        "print(\"Blobstore's name: {}\".format(def_blob_store.name))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Source Directory\n",
        "The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the `source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step when there are no changes in the `source_directory` of the step."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# source directory\n",
        "source_directory = 'data_dependency_run_train'\n",
        "    \n",
        "print('Sample scripts will be created in {} directory.'.format(source_directory))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Required data and script files for the the tutorial\n",
        "Sample files required to finish this tutorial are already copied to the project folder specified above. Even though the .py provided in the samples don't have much \"ML work,\" as a data scientist, you will work on this extensively as part of your work. To complete this tutorial, the contents of these files are not very important. The one-line files are for demostration purpose only."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Compute Targets\n",
        "See the list of Compute Targets on the workspace."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "cts = ws.compute_targets\n",
        "for ct in cts:\n",
        "    print(ct)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Retrieve or create an Aml compute\n",
        "Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's get the default Aml Compute in the current workspace. We will then run the training script on this compute target.\n",
        "\n",
        "> Note that if you have an AzureML Data Scientist role, you will not have permission to create compute resources. Talk to your workspace or IT admin to create the compute targets described in this section, if they do not already exist."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.compute_target import ComputeTargetException\n",
        "\n",
        "aml_compute_target = \"cpu-cluster\"\n",
        "try:\n",
        "    aml_compute = AmlCompute(ws, aml_compute_target)\n",
        "    print(\"found existing compute target.\")\n",
        "except ComputeTargetException:\n",
        "    print(\"creating new compute target\")\n",
        "    \n",
        "    provisioning_config = AmlCompute.provisioning_configuration(vm_size = \"STANDARD_D2_V2\",\n",
        "                                                                min_nodes = 1, \n",
        "                                                                max_nodes = 4)    \n",
        "    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)\n",
        "    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)\n",
        "    \n",
        "print(\"Aml Compute attached\")\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# For a more detailed view of current Azure Machine Learning Compute status, use get_status()\n",
        "# example: un-comment the following line.\n",
        "# print(aml_compute.get_status().serialize())"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "**Wait for this call to finish before proceeding (you will see the asterisk turning to a number).**\n",
        "\n",
        "Now that you have created the compute target, let's see what the workspace's compute_targets() function returns. You should now see one entry named 'amlcompute' of type AmlCompute."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## Building Pipeline Steps with Inputs and Outputs\n",
        "As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline.\n",
        "\n",
        "### Datasources\n",
        "Datasource is represented by **[DataReference](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py)** object and points to data that lives in or is accessible from Datastore. DataReference could be a pointer to a file or a directory."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Reference the data uploaded to blob storage using DataReference\n",
        "# Assign the datasource to blob_input_data variable\n",
        "\n",
        "# DataReference(datastore, \n",
        "#               data_reference_name=None, \n",
        "#               path_on_datastore=None, \n",
        "#               mode='mount', \n",
        "#               path_on_compute=None, \n",
        "#               overwrite=False)\n",
        "\n",
        "blob_input_data = DataReference(\n",
        "    datastore=def_blob_store,\n",
        "    data_reference_name=\"test_data\",\n",
        "    path_on_datastore=\"titanic/Titanic.csv\")\n",
        "print(\"DataReference object created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Intermediate/Output Data\n",
        "Intermediate data (or output of a Step) is represented by **[PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py)** object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.\n",
        "\n",
        "#### Constructing PipelineData\n",
        "- **name:** [*Required*] Name of the data item within the pipeline graph\n",
        "- **datastore_name:** Name of the Datastore to write this output to\n",
        "- **output_name:** Name of the output\n",
        "- **output_mode:** Specifies \"upload\" or \"mount\" modes for producing output (default: mount)\n",
        "- **output_path_on_compute:** For \"upload\" mode, the path to which the module writes this output during execution\n",
        "- **output_overwrite:** Flag to overwrite pre-existing data"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Define intermediate data using PipelineData\n",
        "# Syntax\n",
        "\n",
        "# PipelineData(name, \n",
        "#              datastore=None, \n",
        "#              output_name=None, \n",
        "#              output_mode='mount', \n",
        "#              output_path_on_compute=None, \n",
        "#              output_overwrite=None, \n",
        "#              data_type=None, \n",
        "#              is_directory=None)\n",
        "\n",
        "# Naming the intermediate data as processed_data1 and assigning it to the variable processed_data1.\n",
        "processed_data1 = PipelineData(\"processed_data1\",datastore=def_blob_store, is_directory=True)\n",
        "print(\"PipelineData object created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Pipelines steps using datasources and intermediate data\n",
        "Machine learning pipelines can have many steps and these steps could use or reuse datasources and intermediate data. Here's how we construct such a pipeline:"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define a Step that consumes a datasource and produces intermediate data.\n",
        "In this step, we define a step that consumes a datasource and produces intermediate data.\n",
        "\n",
        "**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Specify conda dependencies and a base docker image through a RunConfiguration\n",
        "\n",
        "This step uses a docker image and scikit-learn, use a [**RunConfiguration**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.runconfiguration?view=azure-ml-py) to specify these requirements and use when creating the PythonScriptStep. "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from azureml.core.runconfig import RunConfiguration\n",
        "from azureml.core.conda_dependencies import CondaDependencies\n",
        "from azureml.core.runconfig import DEFAULT_CPU_IMAGE\n",
        "\n",
        "# create a new runconfig object\n",
        "run_config = RunConfiguration()\n",
        "\n",
        "# enable Docker \n",
        "run_config.environment.docker.enabled = True\n",
        "\n",
        "# set Docker base image to the default CPU-based image\n",
        "run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE\n",
        "\n",
        "# use conda_dependencies.yml to create a conda environment in the Docker image for execution\n",
        "run_config.environment.python.user_managed_dependencies = False\n",
        "\n",
        "# specify CondaDependencies obj\n",
        "run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn'])"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# step4 consumes the datasource (Datareference) in the previous step\n",
        "# and produces processed_data1\n",
        "trainStep = PythonScriptStep(\n",
        "    script_name=\"train.py\", \n",
        "    arguments=[\"--input_data\", blob_input_data, \"--output_train\", processed_data1],\n",
        "    inputs=[blob_input_data],\n",
        "    outputs=[processed_data1],\n",
        "    compute_target=aml_compute, \n",
        "    source_directory=source_directory,\n",
        "    runconfig=run_config\n",
        ")\n",
        "print(\"trainStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define a Step that consumes intermediate data and produces intermediate data\n",
        "In this step, we define a step that consumes an intermediate data and produces intermediate data.\n",
        "\n",
        "**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** "
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# step5 to use the intermediate data produced by step4\n",
        "# This step also produces an output processed_data2\n",
        "processed_data2 = PipelineData(\"processed_data2\", datastore=def_blob_store, is_directory=True)\n",
        "source_directory = \"data_dependency_run_extract\"\n",
        "\n",
        "extractStep = PythonScriptStep(\n",
        "    script_name=\"extract.py\",\n",
        "    arguments=[\"--input_extract\", processed_data1, \"--output_extract\", processed_data2],\n",
        "    inputs=[processed_data1],\n",
        "    outputs=[processed_data2],\n",
        "    compute_target=aml_compute, \n",
        "    source_directory=source_directory)\n",
        "print(\"extractStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Define a Step that consumes intermediate data and existing data and produces intermediate data\n",
        "In this step, we define a step that consumes multiple data types and produces intermediate data.\n",
        "\n",
        "This step uses the output generated from the previous step as well as existing data on a DataStore. The location of the existing data is specified using a [**PipelineParameter**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelineparameter?view=azure-ml-py) and a [**DataPath**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.datapath.datapath?view=azure-ml-py). Using a PipelineParameter enables easy modification of the data location when the Pipeline is published and resubmitted.\n",
        "\n",
        "**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Reference the data uploaded to blob storage using a PipelineParameter and a DataPath\n",
        "from azureml.pipeline.core import PipelineParameter\n",
        "from azureml.data.datapath import DataPath, DataPathComputeBinding\n",
        "\n",
        "datapath = DataPath(datastore=def_blob_store, path_on_datastore='titanic/Titanic.csv')\n",
        "datapath_param = PipelineParameter(name=\"compare_data\", default_value=datapath)\n",
        "data_parameter1 = (datapath_param, DataPathComputeBinding(mode='mount'))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Now define the compare step which takes two inputs and produces an output\n",
        "processed_data3 = PipelineData(\"processed_data3\", datastore=def_blob_store, is_directory=True)\n",
        "source_directory = \"data_dependency_run_compare\"\n",
        "\n",
        "compareStep = PythonScriptStep(\n",
        "    script_name=\"compare.py\",\n",
        "    arguments=[\"--compare_data1\", data_parameter1, \"--compare_data2\", processed_data2, \"--output_compare\", processed_data3],\n",
        "    inputs=[data_parameter1, processed_data2],\n",
        "    outputs=[processed_data3],    \n",
        "    compute_target=aml_compute, \n",
        "    source_directory=source_directory)\n",
        "print(\"compareStep created\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Build the pipeline"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline1 = Pipeline(workspace=ws, steps=[compareStep])\n",
        "print (\"Pipeline is built\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run1 = Experiment(ws, 'Data_dependency_sample').submit(pipeline1)\n",
        "print(\"Pipeline is submitted for execution\")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "RunDetails(pipeline_run1).show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "#### Wait for pipeline run to complete"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pipeline_run1.wait_for_completion(show_output=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### See Outputs\n",
        "\n",
        "See where outputs of each pipeline step are located on your datastore.\n",
        "\n",
        "***Wait for pipeline run to complete, to make sure all the outputs are ready***"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Get Steps\n",
        "for step in pipeline_run1.get_steps():\n",
        "    print(\"Outputs of step \" + step.name)\n",
        "    \n",
        "    # Get a dictionary of StepRunOutputs with the output name as the key \n",
        "    output_dict = step.get_outputs()\n",
        "    \n",
        "    for name, output in output_dict.items():\n",
        "        \n",
        "        output_reference = output.get_port_data_reference() # Get output port data reference\n",
        "        print(\"\\tname: \" + name)\n",
        "        print(\"\\tdatastore: \" + output_reference.datastore_name)\n",
        "        print(\"\\tpath on datastore: \" + output_reference.path_on_datastore)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "### Download Outputs\n",
        "\n",
        "We can download the output of any step to our local machine using the SDK."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "# Retrieve the step runs by name 'train.py'\n",
        "train_step = pipeline_run1.find_step_run('train.py')\n",
        "\n",
        "if train_step:\n",
        "    train_step_obj = train_step[0] # since we have only one step by name 'train.py'\n",
        "    train_step_obj.get_output_data('processed_data1').download(\"./outputs\") # download the output to current directory"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# Next: Publishing the Pipeline and calling it from the REST endpoint\n",
        "See this [notebook](https://aka.ms/pl-pub-rep) to understand how the pipeline is published and you can call the REST endpoint to run the pipeline."
      ]
    }
  ],
  "metadata": {
    "authors": [
      {
        "name": "sanpil"
      }
    ],
    "category": "tutorial",
    "compute": [
      "AML Compute"
    ],
    "datasets": [
      "Custom"
    ],
    "deployment": [
      "None"
    ],
    "exclude_from_index": false,
    "framework": [
      "Azure ML"
    ],
    "friendly_name": "Azure Machine Learning Pipelines with Data Dependency",
    "kernelspec": {
      "display_name": "Python 3.8 - AzureML",
      "language": "python",
      "name": "python38-azureml"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.7"
    },
    "order_index": 2,
    "star_tag": [
      "featured"
    ],
    "tags": [
      "None"
    ],
    "task": "Demonstrates how to construct a Pipeline with data dependency between steps"
  },
  "nbformat": 4,
  "nbformat_minor": 2
}